datascan/bulk-creation-scripts/dataquality /main.py (111 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import click from pathlib import Path from lib import validateConfigFile from lib import validateCLI from lib import generateDataScanId from datascan import convertConfigToPayload from datascan import getDatascan from datascan import createDatascan from datascan import generateDataQualityRules from datascan import parseResponse class ListParamType(click.ParamType): name = 'list' def convert(self, value, param, ctx): try: bq_tables = value.split(',') return bq_tables except Exception as e: self.fail('Could not parse list. Expected format: project_id.location_id.datascan_id,project_id.location_id.datascan_id,...') @click.command() @click.option( "--gcp_project_id", help="GCP Project ID where the data quality scans will be created. " "If --config_path is provided, this option will be ignored. ", default=None, type=str, ) @click.option( "--location_id", help="GCP region where the data quality scans will be created. " "If --config_path is provided, this option will be ignored. ", default=None, type=str, ) @click.option( "--data_profile_ids", help="A list of existing Data Profile Ids. " "Format: project_id.location_id.datascan_id,project_id.location_id.datascan_id,... " "If --config_path is provided, this option will be ignored. ", type=ListParamType(), ) @click.option( "--config_path", help="Users can choose to provide the configuration via a YAML file by specifying the file path. ", type=click.Path(exists=True), default=None, ) def main( gcp_project_id: str, location_id: str, data_profile_ids: list, config_path: Path, ) -> None: if config_path: # validate config file print(f'Checking the configuration file located at {config_path}') configs = validateConfigFile(config_path) for config in configs: gcp_project_id = config['projectId'] location_id = config['locationId'] full_table_name = config['bqTable'] project_id = full_table_name.split('.')[0] dataset_id = full_table_name.split('.')[1] table_id = full_table_name.split('.')[2] # generate payload resource = { 'resource': f'//bigquery.googleapis.com/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}' } payload = convertConfigToPayload(config, resource) # generate datascan id datascan_id = generateDataScanId() print(f'Generated DataQuality scan Id for bq table {project_id}.{dataset_id}.{table_id}: {datascan_id}') # create datascan response = createDatascan( gcp_project_id, location_id, datascan_id, payload ) if response is not None: print(f"{datascan_id} has been created successfully.") print(f"LRO ID for {datascan_id}: ",response.name.split('/')[-1]) else: #validate CLI arguments print('Checking the CLI arguments') validateCLI(gcp_project_id, location_id, data_profile_ids) for data_profile_id in data_profile_ids: project_id = data_profile_id.split('.')[0] region_id = data_profile_id.split('.')[1] data_profile_id = data_profile_id.split('.')[2] # get datascan datascan = getDatascan(project_id, region_id, data_profile_id) if not datascan: print(f'Skipped Dataprofile Id - {gcp_project_id}.{location_id}.{data_profile_id}') else: data_quality_rules_list = generateDataQualityRules(project_id, region_id, data_profile_id) rules = parseResponse(data_quality_rules_list) resource = datascan.data config = {'dataQualitySpec': { 'rules': rules, }} cron = ' '.join(datascan.execution_spec.trigger.schedule.cron.split()[1:]) # generate payload if cron: config.setdefault('executionSpec', {}).setdefault('trigger', {}).setdefault('schedule', {})['cron'] = cron payload = convertConfigToPayload(config, resource) # generate datascan id datascan_id = generateDataScanId() print(f'Generated DataQuality scan Id for data_profile_id {data_profile_id}: {datascan_id}') # create datascan response = createDatascan( gcp_project_id, location_id, datascan_id, payload ) if response is not None: print(f"{datascan_id} has been created successfully.") print(f"LRO ID for {datascan_id}: ",response.name.split('/')[-1]) if __name__ == "__main__": main()